package com.easy.mq.client;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.config.consumer.RocketConsumerConfig;
import com.easy.mq.config.producer.RocketProducerConfig;
import com.easy.mq.entry.JsonUtil;
import com.easy.mq.exception.MQException;
import com.easy.mq.result.RocketProducerMessage;
import com.easy.mq.result.RocketSendResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class RocketClient {

	private final Logger log = LoggerFactory.getLogger(RocketClient.class);

	private	RocketProducerConfig mqProducerConfig;
	
	private ExecutorService executor = Executors.newSingleThreadExecutor();

	public RocketClient(){ 
	}
	
	public RocketClient(RocketProducerConfig mqProducerConfig){ 
		this.mqProducerConfig = mqProducerConfig;		
	}
	
	public  RocketSendResult send(RocketProducerMessage message,boolean async) {
		try {
			if(async) {
				Future<RocketSendResult> future = executor.submit(new SendMessage(message));
				return future.get();
			}else {
				return send(message);
			}
		} catch (InterruptedException | ExecutionException e) {
			log.error(e.getMessage());
		}
		return null;
	}
	
	class SendMessage implements Callable<RocketSendResult>{
		
		private RocketProducerMessage message;
		
		public SendMessage(RocketProducerMessage message) {
			this.message = message;
		}
	
		@Override
		public RocketSendResult call() throws Exception {
			return send(message);
		}
		
	}
	
	public  RocketSendResult send(RocketProducerMessage message) throws MQException {
		DefaultMQProducer producer = null;
		try {
			if (message == null) {
				throw new MQException("The message is empty.");
			}

			if (StringUtils.isEmpty(message.getTopic())) {
				throw new MQException("The topic 'ConsumerId' property is null.");
			}

			if (StringUtils.isEmpty(message.getKeys())) {
				message.setKeys(String.valueOf(System.currentTimeMillis()));
			}
			if (mqProducerConfig.isEnableLog()) {
				log.debug("RocketProducerMessage : {} " ,JsonUtil.bean2Json(message));
			}
			
			producer = RocketProducerPool.getProducer(mqProducerConfig,message);
			SendResult sendResult =producer.send(message);
			if (mqProducerConfig.isEnableLog()) {
				log.info("SEND_RESULT_MSG : {} " , beanToJsonStr(sendResult));
			}
			RocketSendResult result = new RocketSendResult();
			PropertyUtils.copyProperties(result, sendResult);
			return result;
		} catch (Exception e) {
			log.error(e.getMessage(), e);
			RocketProducerPool.shutdown(producer,message);
		}finally {
			//RocketProducerPool.shutdown(producer);
		}
		return null;
	}
	

	/*public MQPushConsumer pushConsumer(RocketConsumerConfig mqConfig) {
		MQPushConsumer pushConsumer = new MQPushConsumer(mqConfig.getGroup());
		try {
			if(org.apache.commons.lang3.StringUtils.isEmpty(mqConfig.getInstanceName())) {
				mqConfig.setInstanceName(UUID.randomUUID().toString());
			}
			pushConsumer.setNamesrvAddr(mqConfig.getAddress());
			pushConsumer.setInstanceName(mqConfig.getInstanceName());
			pushConsumer.subscribe(mqConfig.getTopic(), "*");
			pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			pushConsumer.setMessageModel(MessageModel.CLUSTERING);
			pushConsumer.setConsumeThreadMin(mqConfig.getConsumeThreadMin() <= 0 ? 10 : mqConfig.getConsumeThreadMin());
			pushConsumer
					.setConsumeThreadMax(mqConfig.getConsumeThreadMax() <= 0 ? 500 : mqConfig.getConsumeThreadMax());
			// 本地队列大小
			pushConsumer.setPullThresholdForQueue(
					mqConfig.getPullThresholdForQueue() <= 0 ? 1000 : mqConfig.getPullThresholdForQueue());
			// 消息拉取线程每隔多久拉取一次
			pushConsumer.setPullInterval(mqConfig.getPullInterval() <= 0 ? 0 : mqConfig.getPullInterval());
			// 监听器每次接受本地队列的消息是多少条
			pushConsumer.setConsumeMessageBatchMaxSize(
					mqConfig.getConsumeMessageBatchMaxSize() <= 0 ? 1 : mqConfig.getConsumeMessageBatchMaxSize());
			// 每隔一段时间将各个队列的消费进度存储到对应的broker上
			pushConsumer.setPersistConsumerOffsetInterval(
					mqConfig.getPersistConsumerOffsetInterval() <= 0 ? 5 : mqConfig.getPersistConsumerOffsetInterval());
		} catch (MQClientException e) {
			log.error(e.getMessage(), e);
		}
		
		return pushConsumer;
	}
*/
	public byte[] getMsg(RocketConsumerConfig sub, MessageExt messageExt) {
		if (messageExt == null) {
			return null;
		}
		if (sub.getTopic() != null && sub.getTopic().equals(messageExt.getTopic())) {
			return messageExt.getBody();
		}
		return null;
	}

	private String beanToJsonStr(Object bean) {
		try {
			ObjectMapper mapper = new ObjectMapper();
			return mapper.writeValueAsString(bean);
		} catch (JsonProcessingException e) {
			return "";
		}
	}

}
